[SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s#30283
[SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s#30283jkleckner wants to merge 3 commits intoapache:branch-2.4from
Conversation
|
@shockdm Please check this over. @dongjoon-hyun Maybe we can get this into 2.4 ? My force-push prevented the old PR from being reopened it seems. |
|
ok to test |
|
Test build #130737 has finished for PR 30283 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
An image of just the spark build was created with this job [1]. We are using this image starting today in our dev cluster. [1] https://gitlab.com/jkleckner/spark/-/pipelines/212962140 |
|
This version of Spark has been running fine since the last 3 days in our cluster. |
| private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { | ||
| def awaitCompletion(): Unit | ||
| def reset(): Unit | ||
| def watchOrStop(sId: String): Boolean |
There was a problem hiding this comment.
Could you preserve the order from the original patches (master/branch-3.0)?
def watchOrStop(submissionId: String): Boolean
def reset(): UnitThe function declaration order is different. The parameter name is not the same here (sId != submissionId)
...ernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
Show resolved
Hide resolved
| override def onClose(e: KubernetesClientException): Unit = { | ||
| logDebug(s"Stopping watching application $appId with last-observed phase $phase") | ||
| closeWatch() | ||
| logInfo(s"Stopping watching application $appId with last-observed phase $phase") |
There was a problem hiding this comment.
This is a regression because this is previous logDebug. (Of course, this is different from master branch, too).
| logInfo(s"Stopping watching application $appId with last-observed phase $phase") | ||
| if (e != null && e.getCode==HTTP_GONE) { | ||
| resourceTooOldReceived = true | ||
| logInfo(s"Got HTTP Gone code, resource version changed in k8s api: $e") |
| resourceTooOldReceived = true | ||
| logInfo(s"Got HTTP Gone code, resource version changed in k8s api: $e") | ||
| } else { | ||
| logInfo(s"Got proper termination code, closing watcher.") |
| s"Container final statuses:\n\n${containersDescription(p)}" | ||
| }.getOrElse("No containers were found in the driver pod.")) | ||
| } | ||
|
|
There was a problem hiding this comment.
This removal looks like a part of independency PR instead of the part of SPARK-24266. Could you tell us why this is required and where this came from?
There was a problem hiding this comment.
@dongjoon-hyun It does look like it originates from SPARK-28947 02c5b4f which eliminated the future and was a rename.
Since this is a private trait, the logic should be completely self-contained and safe to remove.
There was a problem hiding this comment.
@jkleckner thank you for following up, that is correct. Sorry for the late response :(
| } | ||
|
|
||
| override def watchOrStop(sId: String): Boolean = if (waitForCompletion) { | ||
| logInfo(s"Patched Sept 8th: Waiting for application" + |
| synchronized { | ||
| while (!podCompleted && !resourceTooOldReceived) { | ||
| wait(interval.get) | ||
| logDebug(s"Application status for $appId (phase: $phase)") |
| createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata]) | ||
| when(podOperations.create(FULL_EXPECTED_POD)).thenReturn(POD_WITH_OWNER_REFERENCE) | ||
| when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch]) | ||
| when(loggingPodStatusWatcher.watchOrStop("default" + ":" + POD_NAME)).thenReturn(true) |
There was a problem hiding this comment.
Maybe, kubernetesConf.namespace() instead of "default"?
There was a problem hiding this comment.
Hi, @jkleckner .
This will be an official Apache Spark patch. Could you clean up something like logInfo(s"Patched Sept 8th: and make it consistent across Apache Spark branches?
404bfa4 to
5960d24
Compare
… changed from k8s This is a backport of apache#29533 from master. It includes the shockdm/pull/1 which has been squashed and the import review comment include. It has also been rebased to branch-2.4 Address review comments.
5960d24 to
86f8ee8
Compare
|
@dongjoon-hyun Sorry for doing force-push. I got used to gitlab which keeps track of history better. |
|
Test build #130823 has finished for PR 30283 at commit
|
|
Test build #130825 has finished for PR 30283 at commit
|
|
Retest this please |
|
Test build #130946 has finished for PR 30283 at commit
|
|
Retest this please. |
| val podWithName = kubernetesClient | ||
| .pods() | ||
| .withName(driverPodName) | ||
|
|
There was a problem hiding this comment.
Could you add the following comments like the other branches?
// Reset resource to old before we start the watch, this is important for race conditions
| .withName(driverPodName) | ||
|
|
||
| watcher.reset() | ||
|
|
There was a problem hiding this comment.
Let's remove this empty line like the other branch.
| podCompleted | ||
| } else { | ||
| logInfo(s"Deployed Spark application ${appId} with submission ID $sId into Kubernetes") | ||
| logInfo(s"It seems we end up here, because we never want to wait for completion...") |
...ernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
Show resolved
Hide resolved
| KUBERNETES_RESOURCE_PREFIX) | ||
| submissionClient.run() | ||
| verify(loggingPodStatusWatcher).awaitCompletion() | ||
| verify(loggingPodStatusWatcher).watchOrStop("default:driver") |
There was a problem hiding this comment.
default -> kubernetesConf.namespace()?
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Thank you for updating, @jkleckner . I left some comments to match this to the other branches.
|
Test build #131105 has finished for PR 30283 at commit
|
| } | ||
|
|
||
| override def watchOrStop(sId: String): Boolean = if (waitForCompletion) { | ||
| logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...") |
There was a problem hiding this comment.
Oh, it's a compilation error.
[error] /home/jenkins/workspace/SparkPullRequestBuilder@2/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala:195: object appName is not a member of package conf
[error] logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...")
[error] ^
[error] one error foundThere was a problem hiding this comment.
Can you confirm that this is the way you want it?
| override def onClose(e: KubernetesClientException): Unit = { | ||
| logDebug(s"Stopping watching application $appId with last-observed phase $phase") | ||
| closeWatch() | ||
| if (e != null && e.getCode==HTTP_GONE) { |
There was a problem hiding this comment.
Missed this one somehow. Updated now.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
For LoggingPodStatusWatcherImpl, I missed that it doesn't have conf which is different from the other branch class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf). Sorry about that. Could you fix the compilation error?
|
@dongjoon-hyun I made the changes. No force-push this time. Sorry to take so long but I did get very very busy. |
|
Test build #131385 has finished for PR 30283 at commit
|
b4cc870 to
1c64c6c
Compare
|
Test build #131388 has finished for PR 30283 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Kubernetes integration test status success |
| val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval) | ||
| val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, | ||
| loggingInterval, | ||
| waitForAppCompletion) |
There was a problem hiding this comment.
Please revert this change. This is inconsistent with Apache Spark 3.1 and 3.0.
| appId: String, | ||
| maybeLoggingInterval: Option[Long]) | ||
| maybeLoggingInterval: Option[Long], | ||
| waitForCompletion: Boolean) |
There was a problem hiding this comment.
Please revert this change. This is inconsistent with Apache Spark 3.1 and 3.0.
… changed from k8s ### What changes were proposed in this pull request? This patch processes the HTTP Gone event and restarts the pod watcher. ### Why are the changes needed? This is a backport of PR #28423 to branch-2.4. The reasons are explained in SPARK-24266 that spark jobs using the k8s resource scheduler may hang. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Manually. Closes #30283 from jkleckner/shockdm-2.4.6-spark-submit-fix. Lead-authored-by: Jim Kleckner <jim@cloudphysics.com> Co-authored-by: Dmitriy Drinfeld <dmitriy.drinfeld@ibm.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Thank you, @jkleckner and all.
+1, LGTM. Merged to branch-2.4 for Apache Spark 2.4.8.
|
Thank you, @dongjoon-hyun ! |
What changes were proposed in this pull request?
This patch processes the HTTP Gone event and restarts the pod watcher.
Why are the changes needed?
This is a backport of PR #28423 to branch-2.4.
The reasons are explained in SPARK-24266 that spark jobs using the k8s resource scheduler may hang.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Manually.